/*
Copyright IBM Corp. 2017 All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

		 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package transientstore

import (
	"errors"

	commonledger "github.com/hyperledger/fabric/common/ledger"
	"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
	"github.com/syndtr/goleveldb/leveldb/iterator"
)

var emptyValue = []byte{}

// ErrStoreEmpty is used to indicate that there are no entries in transient store
var ErrStoreEmpty = errors.New("Transient store is empty")

//////////////////////////////////////////////
// Interfaces and data types
/////////////////////////////////////////////

// StoreProvider provides an instance of a TransientStore
type StoreProvider interface {
	OpenStore(ledgerID string) (Store, error)
	Close()
}

// Store manages the storage of private read-write sets for a ledgerId.
// Ideally, a ledger can remove the data from this storage when it is committed to
// the permanent storage or the pruning of some data items is enforced by the policy
type Store interface {
	// Persist stores the private read-write set of a transaction in the transient store
	Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults []byte) error
	// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
	// RWSets persisted from different endorsers (via Gossip)
	GetTxPvtRWSetByTxid(txid string) (commonledger.ResultsIterator, error)
	// GetSelfSimulatedTxPvtRWSetByTxid returns the private read-write set generated from the simulation
	// performed by the peer itself
	GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*EndorserPvtSimulationResults, error)
	// Purge removes private read-writes set generated by endorsers at block height lesser than
	// a given maxBlockNumToRetain. In other words, Purge only retains private read-write sets
	// that were generated at block height of maxBlockNumToRetain or higher.
	Purge(maxBlockNumToRetain uint64) error
	// GetMinEndorsementBlkHt returns the lowest retained endorsement block height
	GetMinEndorsementBlkHt() (uint64, error)
	Shutdown()
}

// EndorserPvtSimulationResults captures the deatils of the simulation results specific to an endorser
type EndorserPvtSimulationResults struct {
	EndorserID             string
	EndorsementBlockHeight uint64
	PvtSimulationResults   []byte
}

//////////////////////////////////////////////
// Implementation
/////////////////////////////////////////////

// storeProvider encapsulates a leveldb provider which is used to store
// private read-write set of simulated transactions, and implements TransientStoreProvider
// interface.
type storeProvider struct {
	dbProvider *leveldbhelper.Provider
}

// store holds an instance of a levelDB.
type store struct {
	db       *leveldbhelper.DBHandle
	ledgerID string
}

type rwsetScanner struct {
	txid  string
	dbItr iterator.Iterator
}

// NewStoreProvider instantiates TransientStoreProvider
func NewStoreProvider() StoreProvider {
	dbProvider := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: GetTransientStorePath()})
	return &storeProvider{dbProvider: dbProvider}
}

// NewCustomPathStoreProvider constructs a StoreProvider at the given path
// This function is used by ledger to construct a dedicated store till the time the store is managed
// by the transient coordinator. A separate store location by the ledger is desired so that
// the coordinator can be developed independently to handle the management of the transient store at default location.
// Once the coordinator is developed, the ledger can stop using the transient store and this function can be removed.
func NewCustomPathStoreProvider(path string) StoreProvider {
	dbProvider := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: path})
	return &storeProvider{dbProvider: dbProvider}
}

// OpenStore returns a handle to a ledgerId in Store
func (provider *storeProvider) OpenStore(ledgerID string) (Store, error) {
	dbHandle := provider.dbProvider.GetDBHandle(ledgerID)
	return &store{db: dbHandle, ledgerID: ledgerID}, nil
}

// Close closes the TransientStoreProvider
func (provider *storeProvider) Close() {
	provider.dbProvider.Close()
}

// Persist stores the private read-write set of a transaction in the transient store
func (s *store) Persist(txid string, endorserid string,
	endorsementBlkHt uint64, privateSimulationResults []byte) error {
	dbBatch := leveldbhelper.NewUpdateBatch()

	// Create compositeKey with appropriate prefix, txid, endorserid and endorsementBlkHt
	compositeKey := createCompositeKeyForPvtRWSet(txid, endorserid, endorsementBlkHt)
	dbBatch.Put(compositeKey, privateSimulationResults)

	// Create compositeKey with appropriate prefix, endorsementBlkHt, txid, endorserid & Store
	// the compositeKey (purge index) a null byte as value.
	compositeKey = createCompositeKeyForPurgeIndex(endorsementBlkHt, txid, endorserid)
	dbBatch.Put(compositeKey, emptyValue)

	return s.db.WriteBatch(dbBatch, true)
}

// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
// RWSets persisted from different endorserids. Eventually, we may pass a set of collections name
// (filters) along with txid.
func (s *store) GetTxPvtRWSetByTxid(txid string) (commonledger.ResultsIterator, error) {
	// Construct startKey and endKey to do an range query
	startKey := createTxidRangeStartKey(txid)
	endKey := createTxidRangeEndKey(txid)

	iter := s.db.GetIterator(startKey, endKey)
	return &rwsetScanner{txid, iter}, nil
}

// GetSelfSimulatedTxPvtRWSetByTxid returns the private write set generated from the simulation performed
// by the peer itself. Eventually, we may pass a set of collections name (filters) along with txid.
func (s *store) GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*EndorserPvtSimulationResults, error) {
	var err error
	var itr commonledger.ResultsIterator
	if itr, err = s.GetTxPvtRWSetByTxid(txid); err != nil {
		return nil, err
	}
	defer itr.Close()
	var temp commonledger.QueryResult
	var res *EndorserPvtSimulationResults
	for {
		if temp, err = itr.Next(); err != nil {
			return nil, err
		}
		if temp == nil {
			return nil, nil
		}
		res = temp.(*EndorserPvtSimulationResults)
		if selfSimulated(res) {
			return res, nil
		}
	}
}

// Purge removes private read-writes set generated by endorsers at block height lesser than
// a given maxBlockNumToRetain. In other words, Purge only retains private read-write sets
// that were generated at block height of maxBlockNumToRetain or higher.
func (s *store) Purge(maxBlockNumToRetain uint64) error {
	// Do a range query with 0 as startKey and maxBlockNumToRetain-1 as endKey
	startKey := createEndorsementBlkHtRangeStartKey(0)
	endKey := createEndorsementBlkHtRangeEndKey(maxBlockNumToRetain - 1)
	iter := s.db.GetIterator(startKey, endKey)

	dbBatch := leveldbhelper.NewUpdateBatch()

	// Get all txid and endorserid from above result and remove it from transient store (both
	// read/write set and the corresponding index.
	for iter.Next() {
		dbKey := iter.Key()
		txid, endorserid, endorsementBlkHt := splitCompositeKeyOfPurgeIndex(dbKey)
		compositeKey := createCompositeKeyForPvtRWSet(txid, endorserid, endorsementBlkHt)
		dbBatch.Delete(compositeKey)
		dbBatch.Delete(dbKey)
	}
	return s.db.WriteBatch(dbBatch, true)
}

// GetMinEndorsementBlkHt returns the lowest retained endorsement block height
func (s *store) GetMinEndorsementBlkHt() (uint64, error) {
	// Current approach performs a range query on purgeIndex with startKey
	// as 0 (i.e., endorsementBlkHt) and returns the first key which denotes
	// the lowest retained endorsement block height. An alternative approach
	// is to explicitly store the minEndorsementBlkHt in the transientStore.
	startKey := createEndorsementBlkHtRangeStartKey(0)
	iter := s.db.GetIterator(startKey, nil)
	// Fetch the minimum endorsement block height
	if iter.Next() {
		dbKey := iter.Key()
		_, _, endorsementBlkHt := splitCompositeKeyOfPurgeIndex(dbKey)
		return endorsementBlkHt, nil
	}
	// Returning an error may not be the right thing to do here. May be
	// return a bool. -1 is not possible due to unsigned int as first
	// return value
	return 0, ErrStoreEmpty
}

func (s *store) Shutdown() {
	// do nothing because shared db is used
}

// Next moves the iterator to the next key/value pair.
// It returns whether the iterator is exhausted.
func (scanner *rwsetScanner) Next() (commonledger.QueryResult, error) {
	if !scanner.dbItr.Next() {
		return nil, nil
	}
	dbKey := scanner.dbItr.Key()
	dbVal := scanner.dbItr.Value()
	endorserid, endorsementBlkHt := splitCompositeKeyOfPvtRWSet(dbKey)
	return &EndorserPvtSimulationResults{
		EndorserID:             endorserid,
		EndorsementBlockHeight: endorsementBlkHt,
		PvtSimulationResults:   dbVal,
	}, nil
}

// Close releases resource held by the iterator
func (scanner *rwsetScanner) Close() {
	scanner.dbItr.Release()
}

func selfSimulated(pvtSimRes *EndorserPvtSimulationResults) bool {
	return pvtSimRes.EndorserID == ""
}
